[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multiple data sources#3844
[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multiple data sources#3844linjianchang wants to merge 1 commit into
Conversation
| stream = stream.union(streamBranch); | ||
| } | ||
| } | ||
| boolean isParallelMetadataSource = dataSource.isParallelMetadataSource(); |
There was a problem hiding this comment.
I think multi data sources should be regarded as parallelized.
There was a problem hiding this comment.
Already modified
|
|
||
| ```yaml | ||
| source: | ||
| type: mysql_mutiple |
There was a problem hiding this comment.
Should we use a new key like 'sources' to describe multiple sources? The '_multiple' suffix in value seems a bit odd. Because the YAML content does not correspond one-to-one with the PipelineDef.
There was a problem hiding this comment.
Already modified
There was a problem hiding this comment.
sources:
- type: mysql
name: mysql-instance-00
hostname: localhost
port: 3306
....
- type: mysql
name: mysql-instance-01
hostname: localhost
port: 3307
....And the corresponding PipelineDef looks like this:
public class PipelineDef {
@Nullable private List<SourceDef> sources;
private final SourceDef source;
...
}If the sources is not null then we use these data sources, otherwise we use source to build up DataStream. In this way, the previous usage will not be affected.
I want to hear your opinion. @yuxiqian
There was a problem hiding this comment.
I do like @ChaomingZhangCN's proposed syntax for a fully multiple data source, they're intuitive and expressive, but might be a chore if users just want to connect to a MySQL cluster with multiple servers, as they have to copy all identical configurations to both source definition.
@linjianchang's solution for now seems like MySQL specific, especially for multi-host clusters. It could not be extended for hetero-sources (like concatenating data from different DBMS), or when one wants to use different configs for each node. These cases don't exist for now since all we have is MySQL source connector, but as we're modifying composer and YAML API (instead of MySQL connector itself), such possibility should be discussed more carefully.
As for multiple sources in pipeline itself, I remembered the idea has been informally discussed with @leonardBang and @PatrickRen long time ago, and the conclusion was running multiple sources in one single job actually makes the pipeline more fragile, since any single-point failure would easily escalate and cause a global failover. Things might have changed since then, still needs hearing from senior developers on this.
There was a problem hiding this comment.
sources: - type: mysql name: mysql-instance-00 hostname: localhost port: 3306 .... - type: mysql name: mysql-instance-01 hostname: localhost port: 3307 ....And the corresponding PipelineDef looks like this:
public class PipelineDef { @Nullable private List<SourceDef> sources; private final SourceDef source; ... }If the
sourcesis not null then we use these data sources, otherwise we usesourceto build up DataStream. In this way, the previous usage will not be affected. I want to hear your opinion. @yuxiqian @ChaomingZhangCN
It has been modified according to comments, please review it again, thanks!
| private static final String HOST_NAME = "hostname"; | ||
| private static final String PORT = "port"; | ||
| private static final String COLON = ":"; | ||
| private static final String MUTIPLE = "_mutiple"; |
There was a problem hiding this comment.
Should be _multiple.
There was a problem hiding this comment.
Already modified
There was a problem hiding this comment.
@ChaomingZhangCN Hi,please help review when you have time,thanks!
f8524d7 to
994d17a
Compare
994d17a to
c501ce8
Compare
c501ce8 to
8aad734
Compare
8aad734 to
cc561c0
Compare
f6862fe to
fbc1b2b
Compare
fbc1b2b to
8edc345
Compare
|
|
||
| - type: mysql | ||
| name: MySQL multiple Source2 | ||
| hostname: 127.0.0.2 |
There was a problem hiding this comment.
It is essential to verify state compatibility.
Would adding or removing a Source from an existing job lead to state compatibility issues, or could reordering the Sources result in state inconsistency? Such incompatible operations must be exposed as errors to avoid silent handling.
There was a problem hiding this comment.
It is essential to verify state compatibility. Would adding or removing a Source from an existing job lead to state compatibility issues, or could reordering the Sources result in state inconsistency? Such incompatible operations must be exposed as errors to avoid silent handling.
@lvyanquan Thanks for the review!I have already made the modifications and tested them. Please refer to the following document
https://www.yuque.com/oniucium/lfk1te/yruqcmf9vbz4rxzv?singleDoc# 《multiple source从状态恢复验证》
59898ec to
2225400
Compare
57eb8fa to
dd6e0b1
Compare
|
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs. |
pipeline cdc connector support multiple data sources
The relevant design and verification documents are as follows:
https://www.yuque.com/oniucium/lfk1te/ocp6m13kpgh1x9pg?singleDoc#